{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Work in Progress: Load Parquet Into Redshift"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Check: \n",
    "\n",
    "https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_EXTERNAL_TABLE.html for loading Parquet into Redshift via Spectrum\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install -q SQLAlchemy==1.3.13"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sqlalchemy import create_engine\n",
    "from sqlalchemy.orm import sessionmaker\n",
    "import pandas as pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Redshift configuration\n",
    "redshift_username = 'dsoaws'\n",
    "redshift_pw = ''\n",
    "\n",
    "# TODO: get Endpoint/Host name programatically\n",
    "redshift_host = ''\n",
    "redshift_database = 'dsoaws'\n",
    "redshift_port = '5439'\n",
    "\n",
    "schema = 'redshift'\n",
    "table_name_tsv = 'amazon_reviews_tsv'\n",
    "table_name_parquet = 'amazon_reviews_parquet'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Connect to Redshift database engine\n",
    "engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(redshift_username, redshift_pw, redshift_host, redshift_port, redshift_database))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Configure Session\n",
    "session = sessionmaker()\n",
    "session.configure(bind=engine)\n",
    "s = session()\n",
    "#set_path = \"SET search_path TO %s\" % schema\n",
    "s.execute(set_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load Parquet Data From S3 Into Redshift"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create Parquet Table In Redshift"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "statement = \"\"\"DROP TABLE IF EXISTS {}\"\"\".format(table_name_parquet)\n",
    "\n",
    "s = session()\n",
    "s.execute(statement)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#          product_category varchar(24)\n",
    "statement = \"\"\"CREATE TABLE {}( \n",
    "         marketplace varchar(2),\n",
    "         customer_id varchar(8),\n",
    "         review_id varchar(14),\n",
    "         product_id varchar(10),\n",
    "         product_parent varchar(9),\n",
    "         product_title varchar(400),\n",
    "         star_rating int,\n",
    "         helpful_votes int,\n",
    "         total_votes int,\n",
    "         vine varchar(1),\n",
    "         verified_purchase varchar(1),\n",
    "         review_headline varchar(128),\n",
    "         review_body varchar(65535),\n",
    "         review_date varchar(10),\n",
    "         year int\n",
    ")\"\"\".format(table_name_parquet)\n",
    "\n",
    "print(statement)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "s.execute(statement)\n",
    "s.commit()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# \"/\" trainling slash in FROM Path is required not to pick up another directory in S3 path\n",
    "# NOTE: this does not work -- use Spectrum to load parquet data\n",
    "\n",
    "statement = \"\"\"COPY {}\n",
    "                FROM '{}/'\n",
    "                IAM_ROLE '{}' \n",
    "                FORMAT AS PARQUET;\"\"\".format(table_name_parquet, s3_destination_path_parquet, IAM_ROLE)\n",
    "print(statement)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "s = session()\n",
    "s.execute(statement)\n",
    "s.commit()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Run another sample query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.read_sql_query(\"\"\"SELECT product_id, product_title, count(*) AS count_reviews\n",
    "                            FROM \\'{}\\'\n",
    "                            WHERE product_category=\\'Digital_Video_Download\\'\n",
    "                            GROUP BY product_id, product_title\n",
    "                            ORDER BY count_reviews DESC\"\"\".format(table_name_parquet), engine)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.head(5)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
